Zurück zum Artikel
Biblioteken
Notizbuch herunterladen

Biblioteken

In [1]:
import numpy as np
import wave
from scipy.io import wavfile
import matplotlib.pyplot as plt
from scipy.fftpack import fft

# pynq Overlay
from pynq import Overlay
from pynq import allocate

# audio Codec driver module
from pynq.lib.audio import AudioADAU1761

Funktionen

In [2]:
# Funktionen
def FormatChange(x):
    x = x * (2**15)                            # Skalieren auf Q16.16 Bereich (optional leicht unter max)
    x = x.astype(np.int32)                     # Als 32-Bit Integer interpretieren
    input_data = x.view(np.uint32)             # Für DMA als unsigned darstellen
    return input_data

# Normierung Wichtig für .wav-Files!
def Normierung(x):
    m = np.max(np.abs(x))                      # Maxwert für Normalisierung
    x_n = x / m 
    return x_n

def Transmission(input_data,ip_buffer):
    # Festlegen der Größen
    buffer_size = int(ip_buffer)
    # print("Buffer Size: ", buffer_size)
    input_data = FormatChange(Normierung(input_data))
    # input_data = FormatChange(input_data)
    data_size = int(len(input_data))
    print('Data Size: ', data_size)
    
    # Padding
    pad = np.zeros(ip_buffer)
    pad_frame = FormatChange(pad)
    # print('Frame Length: ', len(pad_frame),' / ', 'Frame Type: ', type(pad_frame))
    
    # Leere Buffer
    input_buffer = allocate(shape=(buffer_size,), dtype=np.uint32)
    output_buffer = allocate(shape=(buffer_size,), dtype=np.uint32)
    
    # Padding Inputbuffer
    input_buffer[:] = pad_frame
    
    # Laden der Daten in Inputbuffer
    input_buffer[: data_size] = input_data
    # print('Input Buffer: ', input_buffer[: data_size])
    
    # Senden un Empfangen der Daten
    dma.sendchannel.transfer(input_buffer)
    dma.recvchannel.transfer(output_buffer)
    dma.sendchannel.wait()
    dma.recvchannel.wait()
    
    # check status
    #print("Recv Status: ","Error: ", dma_recv.error, "Idle: ", dma_recv.idle, "Running: ", dma_recv.running)
    #print("Send Status: ","Error: ", dma_send.error, "Idle: ", dma_send.idle, "Running: ", dma_send.running)
    
    # print('Output Buffer: ', output_buffer[: data_size])
    
    # Check for Error
    if dma_recv.error == False and dma_send.error == False:
        print('>>>> Transmission successful <<<<')
    elif dma_recv.error == True or dma_send.error == True:
        print('!!!>> Error in Transmission <<!!!')
    
    # Umrechnen der Empfangenen Daten
    output_data = np.array(output_buffer[: data_size]).view(np.int32)  # zurück zu signed int32
    # output_data = np.array(output_buffer).view(np.int32)  Test des Kompletten Frame
    y = output_data / (2**15)
    y = np.array(y)   # zu np.array
    y = Normierung(y) # ausgabe normieren
    
    # Buffer leeren
    del input_buffer, output_buffer
    print('Buffer Clear')
    print('>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<')
    
    return y
    
def Split2Packets(data,packet_size):
    packets = []
    for i in range(0, len(data), packet_size):
        packet = data[i:i+packet_size]
        packets.append(packet)
    return packets

def send2receive(Data_In):
    ip_buffer = 2**18
    # Filtern Kanal
    Data_Out = []

    # Zerteilung und Übertragung in Packeten
    Packets = Split2Packets(Data_In, ip_buffer)

    for packet in Packets:
        result = Transmission(packet, ip_buffer)
        Data_Out.extend(result)
    return Data_Out

def save_to_24bit_wav(chan_l, chan_r, sample_rate, path):
    # Annahme: frames.shape = (num_frames, num_channels)
    # Typ: float64 in [-1.0, 1.0]
    frames = np.stack((chan_l, chan_r), axis=1) 
    max_val = 2**23 - 1  # 24-bit max signed int
    frames = np.clip(frames, -1.0, 1.0)
    frames_int = (frames * max_val).astype(np.int32)

    # In Bytes umwandeln
    temp_bytes = frames_int.reshape((*frames.shape, 1)).view(np.uint8)
    raw_bytes = temp_bytes[:, :, :3].reshape(-1)

    with wave.open(path, 'wb') as wav_out:
        wav_out.setnchannels(frames.shape[1])
        wav_out.setsampwidth(3)  # 24-bit
        wav_out.setframerate(sample_rate)
        wav_out.writeframes(raw_bytes.tobytes())
        
def read_wav(wav_path):
    with wave.open(wav_path, 'r') as wav_file:
        raw_frames = wav_file.readframes(-1)
        num_frames = wav_file.getnframes()
        num_channels = wav_file.getnchannels()
        sample_rate = wav_file.getframerate()
        sample_width = wav_file.getsampwidth()
    
    temp_buffer = np.empty((num_frames, num_channels, 4), dtype=np.uint8)
    raw_bytes = np.frombuffer(raw_frames, dtype=np.uint8)
    temp_buffer[:, :, :sample_width] = raw_bytes.reshape(-1, num_channels, 
                                                    sample_width)
    temp_buffer[:, :, sample_width:] = \
    (temp_buffer[:, :, sample_width-1:sample_width] >> 7) * 255
    frames = temp_buffer.view('<i4').reshape(temp_buffer.shape[:-1])
    
    print("Frames:",len(frames), "Channels:", num_channels, "Sample Rate:",sample_rate, "Sample Width", sample_width )
    return frames, num_channels, sample_rate, sample_width

Overlay laden

In [3]:
# overlay laden
ol = Overlay("audio_test_v3_ref.bit")
# Check IP names
ol.ip_dict.keys()
dict_keys(['axi_dma_0', 'audio_codec_ctrl_0', 'processing_system7_0'])

Aufnahme

Audio-Codec Treiber laden

In [4]:
# Bezeichnung des Audio Codec Kontrollers
audio_description = ol.ip_dict['audio_codec_ctrl_0']
# Übergabe der Bezeichnung an Treiber
pAudio = AudioADAU1761(audio_description)
# Eintellen des Audiotreibers
pAudio.configure(sample_rate=48000, iic_index=1, uio_name='audio-codec-ctrl')

Lautstärke und Input wählen

Line_IN

In [8]:
# einstellen der Lautsärke
pAudio.set_volume(40)

# Einstellen eingang: LineIn
pAudio.select_line_in()

Microphone

In [5]:
# einstellen der Lautsärke
pAudio.set_volume(30)

# Einstellen eingang: HP/MIC
pAudio.select_microphone()

Aufnahme Starten

In [11]:
# Aufnahme
recTime = 30
pAudio.record(recTime)
# in pAudio.buffer werden die aufnahmen gespreichert
print(pAudio.buffer, type(pAudio.buffer))
[16606874 16640921 16625617 ... 16727143 16749662 16730777] <class 'numpy.ndarray'>

Funktionen nach Aufnahme

Playback

Spielt den Inhalt des Buffers über HP aus

In [12]:
# Buffer ausspielen
pAudio.play()

Speichern

Speichert den Inhalt des Buffers als .wav ab.

In [13]:
pAudio.save("record.wav")

Abspielen der .wav

In [14]:
from IPython.display import Audio as IPAudio
IPAudio("record.wav")

bypass

Gibt die Aufnahme in realtime aus

In [15]:
# Echtzeitausgabe
Time = 60
pAudio.bypass(seconds=Time)

Weiteres

Alle Funktionen sind auf Docs » pynq Package » pynq.lib Package » pynq.lib.audio Module zu finden.
Es wird der Basistreiber aus den Base-Overlay verwendet

Filterung

Die Übergabe wird hier Über PS gemacht, dabei wird die .wav ausgelesen und jeder Kanal einzeln gefiltert bevor diese wieder zusammengesetzt werden
Die filterung ist nicht in Echtzeit. Auch andere Quelle sind in Form einer .wav zulässig, diese sollten aber dem richtigen Format entspechen: 24-Bit Audio + 48kHz Sample Frequenz

Lesen der .wav

Hierfür wurden eigene Funktionen erstellt um die Übersicht zu verbessern. Dabei wird sich an dem Beispiel hier auf dem Board orientiert.

In [15]:
file_name = "record.wav"
[frames, channels, Fs, Fw] = read_wav(file_name)
Frames: 1440000 Channels: 2 Sample Rate: 48000 Sample Width 3

Filtern der Kanäle

Kanäle Aufteilen

In [16]:
# Read Data
data_l = frames[:,0]
data_r = frames[:,1]

Einrichten der Übertragung

In [17]:
# Zuweisung für dma
dma = ol.axi_dma_0
dma_send = ol.axi_dma_0.sendchannel
dma_recv = ol.axi_dma_0.recvchannel
# check status
print("Error: ", dma_recv.error, "Idle: ", dma_recv.idle, "Running: ", dma_recv.running)
print("Error: ", dma_send.error, "Idle: ", dma_send.idle, "Running: ", dma_send.running)
Error:  False Idle:  False Running:  True
Error:  False Idle:  False Running:  True

Senden und Filtern linker Kanal

In [18]:
Data_Out_L = send2receive(data_l)
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  129280
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<

Senden und Filtern rechter Kanal

In [19]:
Data_Out_R = send2receive(data_r)
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  262144
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<
Data Size:  129280
>>>> Transmission successful <<<<
Buffer Clear
>-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-<

Kanäle zusammenfügen und als eine .wav speichern

In [20]:
out_File_name = "filt_rec.wav"
save_to_24bit_wav(Data_Out_L, Data_Out_R, Fs, out_File_name)
In [21]:
from IPython.display import Audio as IPAudio
IPAudio(out_File_name)

Ausgabe über Board

Eine .wav lässt sich sich auch in den Buffer des Codecs laden, sodass über den Ausgang ausgegeben werden kann.
Wichtg: Beide Funktionen dürfen nicht gleichzeitig ausgeführt werden, sonst stützt der Python-Kernel ab!!

In [22]:
pAudio.load(out_File_name)
In [23]:
pAudio.play()